package com.haoziqi.chapter_05;

import com.haoziqi.chapter_02.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * description
 * created by A on 2021/3/5
 */
public class Shuffle {
    public static void main(String[] args) {
        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                // 切分
                String[] line = value.split(",");
                return new WaterSensor(line[0], Long.valueOf(line[1]), Integer.valueOf(line[2]));
            }
        });
        //KeyedStream
        KeyedStream<WaterSensor, String> ks = sensorDS.keyBy(sensor -> sensor.getId());
        SingleOutputStreamOperator<WaterSensor> reduce = ks.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                System.out.println(value1+"-->>"+value2);
                return new WaterSensor(value1.getId(),value1.getTs(),value1.getVs()+value2.getVs());
            }
        });
        reduce.print();

        //提交任务
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
